 1.车辆监控之轨迹数据处理之安装redis
   
   1).下载redis安装包
   hadoop5服务器执行以下命令下载redis安装包
   mkdir -p /opt/lagou/softwares
   mkdir -p /opt/lagou/servers
   cd /opt/lagou/softwares
   wget http://download.redis.io/releases/redis-3.2.8.tar.gz
   2).解压redis压缩包
   cd /opt/lagou/softwares
   tar -zxvf redis-3.2.8.tar.gz -C ../servers/
   3).安装运行环境
   yum -y install gcc-c++
   yum -y install tcl
   4).编译安装
   cd /opt/lagou/servers/redis-3.2.8/
   yum reinstall binutils -y
   make MALLOC=libc 
   make && make install
   5).修改redis配置文件
   修改redis配置文件
   cd /opt/lagou/servers/redis-3.2.8/
   mkdir -p /opt/lagou/servers/redis-3.2.8/logs
   mkdir -p /opt/lagou/servers/redis-3.2.8/redisdata
   
   vim redis.conf
bind linux123
daemonize yes
pidfile /var/run/redis_6379.pid
logfile "/opt/lagou/servers/redis-3.2.8/logs/redis.log"
dir /opt/lagou/servers/redis-3.2.8/redisdata
   6).启动redis  
   启动redis
   cd /opt/lagou/servers/redis-3.2.8/src
   redis-server ../redis.conf
   7).连接redis客户端
   cd /opt/lagou/servers/redis-3.2.8/src
   redis-cli -h linux123

 2.轨迹数据写入Redi
   
        <!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.2.0</version>
        </dependency>
   
   RedisWriter
package com.lg.monitor

import com.lg.bean.BusInfo
import org.apache.spark.sql.ForeachWriter
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}

object RedisWriter{
  //
  private val config= new JedisPoolConfig
  //设置最大连接
  config.setMaxTotal(20)
  //设置空闲连接
  config.setMaxIdle(10)
  private val jedisPool = new JedisPool(config, "linux123", 6379, 1000)

  //从连接池中获取jedis对象
  def getConnection ={
    jedisPool.getResource
  }
}
/**
 * 写入redis的writer
 */
class RedisWriter extends ForeachWriter[BusInfo]{

  var jedis:Jedis=_
  //开启连接
  override def open(partitionId: Long, epochId: Long): Boolean = {
    RedisWriter.getConnection
    true
  }
  //处理数据
  override def process(value: BusInfo): Unit = {
    //把数据写入redis，kv形式
    val lglat: String = value.lglat
    val deployNum: String = value.deployNum
    jedis.set(deployNum, lglat)
  }
  //释放连接
  override def close(errorOrNull: Throwable): Unit = {
    jedis.close()
  }
}


package com.lg.bean
//接收各个字段
case class BusInfo(
                  deployNum:String,
                  simNum:String,
                  transportNum:String,
                  plateNum:String,
                  lglat:String,
                  speed:String,
                  direction:String,
                  mileage:String,
                  timeStr:String,
                  oilRemain:String,
                  weights:String,
                  acc:String,
                  locate:String,
                  oilWay:String,
                  electric:String
                  )
object BusInfo {
  def apply(msg:String): BusInfo= {
    //获取一条消息，按照逗号切分，准备各个字段数据获取businfo对象
    val arr: Array[String] = msg.split(",")
    BusInfo(
      arr(0),
      arr(1),
      arr(2),
      arr(3),
      arr(4),
      arr(5),
      arr(6),
      arr(7),
      arr(8),
      arr(9),
      arr(10),
      arr(11),
      arr(12),
      arr(13),
      arr(14)
    )
  }
}

package com.lg.monitor

import com.lg.bean.BusInfo
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

/**
 * 使用结构化流读取kafka中的数据
 */
object RealTimeProcess {
  def main(args: Array[String]): Unit = {
    //1.获取sparksession
    val spark: SparkSession = SparkSession.builder()
      .master("local[*]")
      .appName(RealTimeProcess.getClass.getName)
      .getOrCreate()
    val sc = spark.sparkContext
    sc.setLogLevel("WARN")
    import spark.implicits._
    //2.定义读取kafka数据源
    val kafkaDF: DataFrame = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "linxu122:9092,linxu123:9092")
      .option("subscribe", "lagou_bus_info")
      .load()

    //3.处理数据
    val kafkaValDf: DataFrame = kafkaDF.selectExpr("CAST(value AS STRING)")
    //转为ds
    val kafkaDs: Dataset[String] = kafkaValDf.as[String]
    //解析出经纬度数据，写入redis
    //封装为一个case class：方便后续获取指定字段的数据
    val busInfoDs: Dataset[BusInfo] = kafkaDs.map(BusInfo(_))
    //把经纬度数据写入redis
    busInfoDs.writeStream
      .foreach(new RedisWriter)
      .outputMode("append")
      .start()
      .awaitTermination()
  }
}

 3.轨迹数据写入Hbase
   创建表
   create 'htb_gps','car_info'
    <dependency>
     <groupId>org.apache.hadoop</groupId>
     <artifactId>hadoop-client</artifactId>
     <version>2.6.0-mr1-cdh5.14.0</version>
   </dependency>
   <dependency>
     <groupId>org.apache.hbase</groupId>
     <artifactId>hbase-client</artifactId>
     <version>1.2.0-cdh5.14.0</version>
   </dependency>
   <dependency>
     <groupId>org.apache.hbase</groupId>
     <artifactId>hbase-server</artifactId>
     <version>1.2.0-cdh5.14.0</version>
   </dependency>

package com.lg.monitor

import com.lg.bean.BusInfo
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put, Table}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.ForeachWriter

object HbaseWriter{
  //hbase中的connection本身底层已经使用了线程池，而且connection是线程安全的，可以全局使用一个
  //但是对admin，table需要每个线程使用一个

  def getHtable() = {
    //获取连接
    val conf: Configuration = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.property.clientPort", "2181")
    conf.set("hbase.zookeeper.quorum", "linux122,linux123")
    val conn: Connection = ConnectionFactory.createConnection(conf)
    //hbase表名：htp_gps
    val table: Table = conn.getTable(TableName.valueOf("htp_gps"))
    table
  }
}
class HbaseWriter  extends ForeachWriter[BusInfo]{
  var table: Table = _
  override def open(partitionId: Long, epochId: Long): Boolean = {
    val table = HbaseWriter.getHtable()
    true
  }

  override def process(value: BusInfo): Unit = {
    //rowkey:调度编号+车牌号+时间戳
    var rowkey: String =value.deployNum+value.plateNum+value.timeStr
    val put = new Put(Bytes.toBytes(rowkey))
    val arr: Array[String] = value.lglat.split("_")
    //经度
    put.addColumn(
      Bytes.toBytes("car_info"),
      Bytes.toBytes("lng"),
      Bytes.toBytes(arr(0))
    )
    //维度
    put.addColumn(
      Bytes.toBytes("car_info"),
      Bytes.toBytes("lat"),
      Bytes.toBytes(arr(0))
    )
    table.put(put)
  }

  override def close(errorOrNull: Throwable): Unit = {
    table.close()
  }
}



    //把经纬度数据写入HBase
        busInfoDs.writeStream
          .foreach(new HbaseWriter)
          .outputMode("append")
          .start()
          .awaitTermination()
		  
 
 4.异常检测

   监听剩余油量小于百分之三十的运输车辆
   写入Kafka
    //3.处理数据
    val kafkaValDf: DataFrame = kafkaDF.selectExpr("CAST(value AS STRING)")
    //转为ds
    val kafkaDs: Dataset[String] = kafkaValDf.as[String]
    //解析出经纬度数据，写入redis
    //封装为一个case class：方便后续获取指定字段的数据
    val busInfoDs: Dataset[BusInfo] = kafkaDs.map(BusInfo(_))
    //把经纬度数据写入redis
    busInfoDs.writeStream
      .foreach(new RedisWriter)
      .outputMode("append")
      .start()
//      .awaitTermination()
    //把经纬度数据写入HBase
        busInfoDs.writeStream
          .foreach(new HbaseWriter)
          .outputMode("append")
          .start()
//          .awaitTermination()
    //实现对车辆异常情况的检测
    val warnInfoDs: Dataset[BusInfo] = busInfoDs.filter(
      info => {
        val remain: String = info.oilRemain
        remain.toInt < 30 //剩余油量小于30%
      }
    )
    //写入到kafka另外一个主题，由web系统监听，然后推送警告信息到车载客户单
    //写出的ds/df中必须有一个列名叫做value
    warnInfoDs.withColumn("value", new Column("deployNum"))
      .writeStream
      .format("kafka")
//      .option("checkpointLocation", "./ck") //ck目录一般选择是hdfs目录
	  .option("checkpointLocation", "hdfs://linux123/ck")
      .option("kafka.bootstrap.servers", "linxu122:9092,linxu123:9092")
      .option("topic", "lg_bus_warn_info")
      .start()
//      .awaitTermination()
    spark.streams.awaitAnyTermination()
	
	提交任务
    报错信息
User class threw exception:
com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map
due to end-of-input
删除之前运行的ck目录（修改了代码之前目录的数据无法匹配所以报错）
spark2-submit 
--class com.lg.monitor.RealTimeProcess \
--master yarn \
--deploy-mode cluster \ 
--executor-memory 1G \
--num-executors 3 \
/root/monitor/bus_monitor.jar;

    System.setProperty("HADOOP_USER_NAME", "root") //权限问题
    注意：
	1)、需要修改CDH中spark2依赖的Kafka版本
	2)、如果ck目录选择HDFS
	可以修改HDFS返回主机名称而不是ip地址，可以在直接更改集群的配置，然后把hdfs-site.xml引入工
程中
    中止任务
	yarn application -kill application_1602905611313_0048